﻿#include <QtConcurrent/QtConcurrent>

#include "ZmqSendReplyServer.h"
#include "WaitUtils.h"
#define CHAR_MAX_LENGTH        1024    //使用的字符串最大长度

ZmqSendReplyServer::ZmqSendReplyServer(QObject* parent):BaseSendReply(parent)
{
    _isRunning=true;
    _mutex=new QMutex();
    _waitCondition=new QWaitCondition();
}

bool ZmqSendReplyServer::Connect()
{
    _context=zmq_ctx_new();
    //reply方
    _socket=zmq_socket(_context,ZMQ_REP);
    int rc=zmq_bind(_socket,_communicationParam.toUtf8());
    qDebug()<<__FUNCTION__<<__LINE__<<_communicationParam.toUtf8()<<rc;
    if(rc!=0)
    {
        return false;
    }

    int timeout=3000;
    if(zmq_setsockopt(_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) < 0)
    {
        qDebug()<<"Failed to setsocketopt ZMQ_RCVTIMEO error";
    }

    QtConcurrent::run([this]()
    {
        while(_isRunning)
        {
            WaitUtils::WaitMsNoException(200);
            char receivetMsg[CHAR_MAX_LENGTH*10] = {'\0'};
            int len = zmq_recv(_socket, receivetMsg, sizeof(receivetMsg),0);
            if(len<=0)
            {
                continue;
            }
            QByteArray data(receivetMsg,len);
            emit DataReceive(data);
            _mutex->lock();
            if(_waitCondition->wait(_mutex,3000))
            {
                while(!_sendQueue.empty())
                {
                    QByteArray sendBuff=_sendQueue.dequeue();
                    int len=sendBuff.length();
                    bool sendret=zmq_send(_socket,sendBuff.data(),len,0)==len;
//                    qDebug()<<__FUNCTION__<<__LINE__<<sendret;
                }
            }
            else
            {
                char reply[60] = "receive data ok but response timeout";
                zmq_send(_socket,reply,sizeof(reply),0);
            }
            _mutex->unlock();
        }
    });
    return true;
}

void ZmqSendReplyServer::Dispose()
{
    _isRunning=false;
    zmq_close(_socket);
    zmq_ctx_destroy(_context);
}

bool ZmqSendReplyServer::SendNoReply(QByteArray sendBuff)
{
    _mutex->lock();
    _sendQueue.enqueue(sendBuff);
    _waitCondition->wakeOne();
    _mutex->unlock();
    return true;
}
